---
title: Lakehouse | Dagster
description: Lakehouse is set of APIs for defining pipelines that puts assets, like database tables and ML models, at the center.
---

# Dagster Lakehouse <Experimental />

<CodeReferenceLink filePath="examples/simple_lakehouse" />

Lakehouse is set of APIs for defining pipelines that puts "assets", like database tables and ML
models, at the center.

Lakehouse is built on top of Dagster's core abstractions, and is currently an experimental API.

In this example, we'll define some tables and generate a Dagster pipeline that updates them. We
have a table of temperature samples collected in five-minute increments, and we want to compute a
table of the highest temperatures for each day.

## Data Assets

Here are our asset (aka table) definitions.

```python file=../../simple_lakehouse/simple_lakehouse/assets.py
"""Asset definitions for the simple_lakehouse example."""
import pandas as pd
from lakehouse import Column, computed_table, source_table
from pyarrow import date32, float64, string

sfo_q2_weather_sample_table = source_table(
    path="data",
    columns=[Column("tmpf", float64()), Column("valid_date", string())],
)


@computed_table(
    input_assets=[sfo_q2_weather_sample_table],
    columns=[Column("valid_date", date32()), Column("max_tmpf", float64())],
)
def daily_temperature_highs_table(sfo_q2_weather_sample: pd.DataFrame) -> pd.DataFrame:
    """Computes the temperature high for each day"""
    sfo_q2_weather_sample["valid_date"] = pd.to_datetime(sfo_q2_weather_sample["valid"])
    return sfo_q2_weather_sample.groupby("valid_date").max().rename(columns={"tmpf": "max_tmpf"})
```

They're pure functions that describe how the asset is derived from parent assets.
They intentionally omit code for storing and retrieving the assets, because that code often varies
across environments - e.g. we might want to store data in a local csv file for easy testing, but
store data a data warehouse in production.

`sfo_q2_weather_sample_table` represents our base temperature table. `"filesystem"` is a name we
have chosen to identify the storage system where this table lives. The `path` argument gives
the path to the data asset itself within that storage system.

`daily_temperature_highs_table` represents our computed asset. We explicitly define the dependency
on the original table by passing `sfo_q2_weather_sample_table` as the value for the `input_deps`
argument.

## Storage

```python file=../../simple_lakehouse/simple_lakehouse/lakehouse_def.py
"""
This defines a Lakehouse with local storage and Pandas data processing.

Data is locally stored in csv files.

Pandas is used for data processing.  Data can be read from CSV files into a
pandas dataframe, and exported back into pandas dataframes.
"""
import os
from typing import Tuple

import pandas as pd
from dagster import ModeDefinition, StringSource, resource
from lakehouse import AssetStorage, Lakehouse


class LocalFileSystemStorage(AssetStorage):
    def __init__(self, root):
        self._root = root

    def _get_fs_path(self, path: Tuple[str, ...]) -> str:
        rpath = os.path.join(self._root, *path) + ".csv"
        return os.path.abspath(rpath)

    def save(self, obj: pd.DataFrame, path: Tuple[str, ...], _resources) -> None:
        """This saves the dataframe as a CSV."""
        fpath = self._get_fs_path(path)
        obj.to_csv(fpath)

    def load(self, _python_type, path: Tuple[str, ...], _resources):
        """This reads a dataframe from a CSV."""
        fpath = self._get_fs_path(path)
        return pd.read_csv(fpath)


@resource(config_schema={"root": StringSource})
def local_fs_storage(init_context):
    return LocalFileSystemStorage(init_context.resource_config["root"])


simple_lakehouse = Lakehouse(
    mode_defs=[
        ModeDefinition(
            resource_defs={"default_storage": local_fs_storage.configured({"root": "."})},
        )
    ]
)
```

We want to persist the data to disk using csv files. Then, we need to create an `AssetStorage` to
describe the conversion between pandas dataframes and csv files.

The `load` function converts inputs to the required format for an asset. Since our base asset will
represent a csv file, and our second asset will be processing a pandas dataframe, `load` will
convert a csv to a dataframe. Likewise, we want to persist the results of our
second asset as a csv file, so our `save` method converts a pandas dataframe to a csv.

Then, we construct our `Lakehouse`, which delegates conversion and storage between assets by
utilizing the `AssetStorage` we just defined.

## Pipeline

The data assets, combined with the storage for handling conversion between data formats, completely
define a computation graph. As a result, we can use the assets and storage to construct a pipeline.

```python file=../../simple_lakehouse/simple_lakehouse/pipelines.py
"""Pipeline definitions for the simple_lakehouse example.
"""
from simple_lakehouse.assets import daily_temperature_highs_table
from simple_lakehouse.lakehouse_def import simple_lakehouse

computed_assets = [daily_temperature_highs_table]
simple_lakehouse_pipeline = simple_lakehouse.build_pipeline_definition(
    "simple_lakehouse_pipeline",
    computed_assets,
)
```

Note that the assets don't have to be provided in order. Lakehouse is able to determine asset
ordering by resolving input asset dependencies.
